Using Apache Spark

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).

SparkContext allocate resources across applications.

Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.

Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.

Finally, SparkContext sends tasks to the executors to run.


In [ ]:
import pyspark
sc = pyspark.SparkContext(appName="my_spark_app")

In [ ]:
sc

Interactive programming: is the procedure of writing parts of a program while it is already active. The Jupyter Notebook will be the frontend for our active program.

For interactive programming we will have:

  • A Jupyter/IPython notebook: where we run Python code
  • PySparkShell application UI: to monitor Spark Cluster

Monitoring Spark Jobs

Every SparkContext launches its own instance of Web UI which is available at http://[master]:4040 by default.

Web UI comes with the following tabs:

* Jobs
* Stages
* Storage with RDD size and memory use
* Environment
* Executors
* SQL

This information is available only until the application is running by default.

Jobs

  • Job id
  • Description
  • Submission date
  • Job Duration
  • Stages
  • Tasks

Stages

What is a Stage?:

A stage is a physical unit of execution. It is a step in a physical execution plan.

A stage is a set of parallel tasks, one per partition of an RDD, that compute partial results of a function executed as part of a Spark job.

In other words, a Spark job is a computation with that computation sliced into stages.

A stage is uniquely identified by id. When a stage is created, DAGScheduler increments internal counter nextStageId to track the number of stage submissions.

A stage can only work on the partitions of a single RDD (identified by rdd), but can be associated with many other dependent parent stages (via internal field parents), with the boundary of a stage marked by shuffle dependencies.

Storage

Storage page permit us to see how RDD are partitioned across the cluster.

Environment

This tab shows configuration and variables used in Apache Spark execution.

Executors

In this tab, we can see information about executors available in the cluster.

We can have relevant information about CPU and Memory, as well as RDD storage.

We can also have information about executed tasks.

Main Spark Concepts

Partitions

Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD.

That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD.

Caching

You will often hear: "Spark handles all data in memory".

This is tricky and here's where the magic relies. Most of the time you will be working with metadata not with all the data, and computations are only left for the time that you need the results.

Storing that results or leaving them to compute them again has a high impact in response times. When you store the results, it is said to be catching the RDD.

Shuffling

(from: https://0x0fff.com/spark-architecture-shuffle/)

(more about shuffling: https://spark.apache.org/docs/1.3.1/programming-guide.html#performance-impact)

(best practices: https://robertovitillo.com/2015/06/30/spark-best-practices/)

There are many different tasks that require shuffling of the data across the cluster, for instance table join – to join two tables on the field “id”, you must be sure that all the data for the same values of “id” for both of the tables are stored in the same chunks.

Imagine the tables with integer keys ranging from 1 to 1’000’000. By storing the data in same chunks I mean that for instance for both tables values of the key 1-100 are stored in a single partition/chunk, this way instead of going through the whole second table for each partition of the first one, we can join partition with partition directly, because we know that the key values 1-100 are stored only in these two partitions. To achieve this both tables should have the same number of partitions, this way their join would require much less computations. So now you can understand how important shuffling is.

Exercises

(from: http://blog.insightdatalabs.com/jupyter-on-apache-spark-step-by-step/)

Exercise 1: Check that SparkContext is loaded in your current environment.

Exercise 2: Create your first RDD with 20 partitions and check WebUI that the RDD has created a job, an stage and 20 partitions. The RDD must contain a list of 1000 integers starting from 0. Get the number of partitions using getNumPartitions().

(Hint 1: you can use sc.parallelize)

(Hint 2: check Spark API docs: http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.parallelize)

Exercise 3: Get 5 elements of the RDD.

Exercise 4: Name the RDD as "my_rdd" and persist it into memory and disk serialized.

Exercise 5: Perform a transformation to group the numbers into the lowest 100s and count the total frequency for each bin.

Exercise 6: Browse the WebUI. And:

* identify the RDD generated in Exercise X and its job
* identify the job in Exercise X 
* check that the RDD has been cached
* identify the job in Exercise X

Answer 1:


In [ ]:
## just check that sc variables is not 
print("is SpartContext loaded?", sc != '')

Answer 2:


In [ ]:
rdd = sc.parallelize([x for x in range(1000)],20)
rdd.getNumPartitions()

Answer 3:


In [ ]:
rdd.take(5)

Answer 4:


In [ ]:
rdd.setName("my_rdd").persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

Answer 5:


In [ ]:
rdd.map(lambda r: (round(r/100)*100, 1))\
   .reduceByKey(lambda x,y: x+y)\
   .collect()

In [ ]: